#!/bin/sh

. "$SPEC_PATH/abstract/backup"

. "$SPEC_PATH/abstract/sql-scripts"

e2e_test_extra_hash() {
  "$SHELL" "$PROJECT_PATH/stackgres-k8s/ci/build/build-functions.sh" path_hash \
    "$(realpath --relative-to "$PROJECT_PATH" "$SPEC_PATH/abstract/backup")"
  "$SHELL" "$PROJECT_PATH/stackgres-k8s/ci/build/build-functions.sh" path_hash \
    "$(realpath --relative-to "$PROJECT_PATH" "$SPEC_PATH/abstract/sql-scripts")"
  "$SHELL" "$PROJECT_PATH/stackgres-k8s/ci/build/build-functions.sh" path_hash \
    "$(realpath --relative-to "$PROJECT_PATH" "$SPEC_PATH/sql-scripts.sakila.sql")"
}

e2e_test_install() {
  STREAM_NAME="$(get_sgstreams_name "$SPEC_NAME-operation")"
  TARGET_CLUSTER_NAME="$(get_sgstreams_name "$SPEC_NAME-target")"
  POSTGIS_VERSION="$(get_latest_version_of_extension postgis "$E2E_POSTGRES_VERSION" || true)"

  install_minio

  cat << 'EOF' | kubectl create -n "$CLUSTER_NAMESPACE" secret generic sql-scripts-sakila-user \
    --from-literal=create-sakila-user.sql="$(cat)"
DO $$
BEGIN
  IF NOT EXISTS (SELECT * FROM pg_roles WHERE rolname = 'sakila') THEN
    EXECUTE 'CREATE USER sakila WITH PASSWORD ''sakila'';';
  END IF;
END$$;
EOF

  kubectl create -n "$CLUSTER_NAMESPACE" configmap sql-scripts-sakila-schema \
    --from-file=create-sakila-schema.sql="$SPEC_PATH/sql-scripts.sakila.sql"

  create_or_replace_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" 1 \
    --set-string "cluster.managedSql.scripts[0].script=CREATE DATABASE sakila" \
    --set-string "cluster.managedSql.scripts[1].database=sakila" \
    --set-string "cluster.managedSql.scripts[1].scriptFrom.secretKeyRef.name=sql-scripts-sakila-user" \
    --set-string "cluster.managedSql.scripts[1].scriptFrom.secretKeyRef.key=create-sakila-user.sql" \
    --set-string "cluster.managedSql.scripts[2].database=sakila" \
    --set-string "cluster.managedSql.scripts[2].wrapInTransaction=repeatable-read" \
    --set-string "cluster.managedSql.scripts[2].scriptFrom.configMapKeyRef.name=sql-scripts-sakila-schema" \
    --set-string "cluster.managedSql.scripts[2].scriptFrom.configMapKeyRef.key=create-sakila-schema.sql"
  wait_until kubectl -n "$CLUSTER_NAMESPACE" get secret "$CLUSTER_NAME" >/dev/null 2>&1

  create_or_replace_cluster "$TARGET_CLUSTER_NAME" "$CLUSTER_NAMESPACE" 1 \
    --set configurations.create=false \
    --set instanceProfiles=null \
    --set-string cluster.postgres.extensions[0].name=postgis \
    --set-string "cluster.postgres.extensions[0].version=$POSTGIS_VERSION" \
    --set-string "cluster.managedSql.scripts[0].script=CREATE EXTENSION postgis" \
    --set-string "cluster.managedSql.scripts[1].script=CREATE DATABASE sakila"

  deploy_curl_pod "$CLUSTER_NAMESPACE"

  wait_pods_running "$CLUSTER_NAMESPACE" 4
  wait_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  wait_cluster "$TARGET_CLUSTER_NAME" "$CLUSTER_NAMESPACE"
}

e2e_test() {
  if [ -z "$POSTGIS_VERSION" ]
  then
    echo "Skipping stream-to-cluster since postgis not available for $(uname -m)"
    return
  fi

  run_test "Checking that stream is working skipping DDL import" check_stream_is_working_skippig_ddl_import

  run_test "Checking that stream is working" check_stream_is_working

  run_test "Checking that stream copying schema first is working" check_stream_copy_schema_is_working

  run_test "Checking that stream with incremental snapshots is working" check_stream_incremental_snapshots_is_working
}

check_stream_is_working_skippig_ddl_import() {
  check_stream_is_working true
}

check_stream_is_working() {
  SKIP_DDL_IMPORT="${1:-false}"
  cat << 'EOF' | tee "$LOG_PATH/list-types.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on -tA > "$LOG_PATH/types"
SELECT typcategory || ' ' || typtype || ' ' || typname || ' ' || typformattype
FROM (
  SELECT
    t.typname AS typname,
    pg_catalog.format_type(t.oid, NULL) AS typformattype,
    text(t.typcategory) AS typcategory,
    text(t.typtype) AS typtype
  FROM pg_catalog.pg_type t LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
  WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid))
    AND NOT EXISTS(SELECT 1 FROM pg_catalog.pg_type el WHERE el.oid = t.typelem AND el.typarray = t.oid)
    AND pg_catalog.pg_type_is_visible(t.oid)
    AND t.typtype NOT IN ('p')
    AND t.typcategory NOT IN ('Z')
    AND t.typname NOT LIKE 'reg%'
    AND t.typname != 'int2vector' -- See https://stackoverflow.com/a/74612592
    AND t.typname != 'oidvector' -- See https://stackoverflow.com/a/74612592
    AND t.typname != 'gtsvector' -- See https://doxygen.postgresql.org/tsgistidx_8c_source.html#l00094
    AND t.typname != 'refcursor' -- See https://www.postgresql.org/docs/current/plpgsql-cursors.html
    AND t.typname != 'pg_dependencies' -- See https://github.com/postgres/postgres/blob/035f99cbebe5ffcaf52f8370394446cd59621ab7/src/backend/statistics/dependencies.c#L646-L664
    AND t.typname != 'pg_mcv_list' -- See https://github.com/postgres/postgres/blob/master/src/backend/statistics/README.mcv
    AND t.typname != 'pg_ndistinct' -- See https://www.postgresql.org/message-id/MN2PR05MB68795FCDB5B560D350084753B6FA9%40MN2PR05MB6879.namprd05.prod.outlook.com
    AND t.typname != 'pg_node_tree' -- See https://www.postgresql.org/message-id/20595.1347653162%40sss.pgh.pa.us
  UNION ALL
  SELECT
    (CASE
      WHEN t.typname = 'int2' THEN 'smallserial'
      WHEN t.typname = 'int4' THEN 'serial'
      ELSE 'bigserial'
      END) AS typname,
    pg_catalog.format_type(t.oid, NULL) AS typformattype,
    text(t.typcategory) AS typcategory,
    text(t.typtype) AS typtype
  FROM pg_catalog.pg_type t LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
  WHERE t.typname IN ('int2', 'int4', 'int8')) _
ORDER BY typname;
EOF

  cat << EOF | tee "$LOG_PATH/reset-target-status.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$TARGET_CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on
ALTER DATABASE postgres SET sgstream.ddl_import_completed = false;
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS pop;
$(
set +x
cat "$LOG_PATH/types" | while read -r TYPE_CATEGORY TYPE_TYPE TYPE_NAME TYPE_FORMAT_TYPE
do
  cat << INNER_EOF
DROP TABLE IF EXISTS complex_$TYPE_NAME;
INNER_EOF
done
)
EOF

  TYPE_COUNT="$(wc -l "$LOG_PATH/types" | cut -d ' ' -f 1)"
  cat << EOF | tee "$LOG_PATH/init-tables.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on
DROP TABLE IF EXISTS test;
CREATE TABLE test(i bigint, t text, PRIMARY KEY(i));
DROP TABLE IF EXISTS pop;

INSERT INTO test SELECT i, 'test' FROM generate_series(1, 3) AS i ON CONFLICT (i) DO UPDATE SET t=EXCLUDED.t;

DO \$\$BEGIN
EXECUTE \$execute\$CREATE OR REPLACE FUNCTION create_complex_table() RETURNS void AS \$sql\$
$(
set +x
cat "$LOG_PATH/types" | while read -r TYPE_CATEGORY TYPE_TYPE TYPE_NAME TYPE_FORMAT_TYPE
do
  cat << INNER_EOF
DROP TABLE IF EXISTS complex_$TYPE_NAME;
$(
if is_array_not_supported "$TYPE_CATEGORY" "$TYPE_TYPE" "$TYPE_NAME" "$TYPE_FORMAT_TYPE"
then
  cat << TABLE_EOF
CREATE TABLE complex_$TYPE_NAME(i bigint,\$execute\$ || quote_ident('c_$TYPE_NAME') || ' $TYPE_NAME NOT NULL' || \$execute\$, updated boolean DEFAULT false, PRIMARY KEY (i));
TABLE_EOF
else
  cat << TABLE_EOF
CREATE TABLE complex_$TYPE_NAME(i bigint,\$execute\$ || quote_ident('c_$TYPE_NAME') || ' $TYPE_NAME NOT NULL' || ',' || quote_ident('c_$TYPE_NAME') || '_array $TYPE_NAME[] NOT NULL' || \$execute\$, updated boolean DEFAULT false, PRIMARY KEY (i));
TABLE_EOF
fi
)
INNER_EOF
done
)
\$sql\$ LANGUAGE sql\$execute\$;
END\$\$;

DO \$\$BEGIN
EXECUTE \$execute\$CREATE OR REPLACE FUNCTION insert_complex(i bigint) RETURNS void LANGUAGE plpgsql AS \$plpgsql\$BEGIN
$(
set +x
cat "$LOG_PATH/types" | while read -r TYPE_CATEGORY TYPE_TYPE TYPE_NAME TYPE_FORMAT_TYPE
do
  cat << INNER_EOF
  EXECUTE \$insert\$INSERT INTO complex_$TYPE_NAME
    SELECT c1, c2 $(is_array_not_supported "$TYPE_CATEGORY" "$TYPE_TYPE" "$TYPE_NAME" "$TYPE_FORMAT_TYPE" || printf ', c3')
    FROM (
    SELECT NULL AS c1, NULL AS c2, NULL AS c3 where 1 = 0 
    -- Returns empty table with predefined column names
    UNION ALL
    SELECT \$insert\$ || i || \$insert\$,
    $(
    if [ "$TYPE_NAME" = 'aclitem' ]
    then
      printf %s "makeaclitem('postgres'::regrole, 'authenticator'::regrole, 'SELECT', FALSE)::text::$TYPE_FORMAT_TYPE,null"
    elif [ "$TYPE_NAME" = 'json' ] || [ "$TYPE_NAME" = 'jsonb' ]
    then
      printf %s "('\"' || (SELECT string_agg(i::text, '') FROM generate_series(1,1000) i) || '\"')::$TYPE_FORMAT_TYPE,"
      printf %s "ARRAY[('\"' || (SELECT string_agg(i::text, '') FROM generate_series(1,1000) i) || '\"')::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'jsonpath' ]
    then
      printf %s "('\$.b' || (SELECT string_agg(i::text, '') FROM generate_series(1,1000) i))::$TYPE_FORMAT_TYPE,"
      printf %s "ARRAY[('\$.b' || (SELECT string_agg(i::text, '') FROM generate_series(1,1000) i))::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'macaddr' ]
    then
      printf %s "'08:00:2b:01:02:03'::$TYPE_FORMAT_TYPE, ARRAY['08:00:2b:01:02:03'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'macaddr8' ]
    then
      printf %s "'08:00:2b:01:02:03:04:05'::$TYPE_FORMAT_TYPE,ARRAY['08:00:2b:01:02:03:04:05'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'pg_lsn' ]
    then
      printf %s "'FFFFFFFF/FFFFFFFF'::$TYPE_FORMAT_TYPE,ARRAY['FFFFFFFF/FFFFFFFF'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'pg_snapshot' ]
    then
      printf %s "txid_current_snapshot()::text::$TYPE_FORMAT_TYPE,ARRAY[txid_current_snapshot()::text::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'txid_snapshot' ]
    then
      printf %s "'10:20:10,14,15'::$TYPE_FORMAT_TYPE,ARRAY['10:20:10,14,15'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'uuid' ]
    then
      printf %s "'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::$TYPE_FORMAT_TYPE,ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'datemultirange' ]
    then
      printf %s "'{(,)}'::$TYPE_FORMAT_TYPE,ARRAY['{(,)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'daterange' ]
    then
      printf %s "'(,)'::$TYPE_FORMAT_TYPE,ARRAY['(,)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'int4multirange' ]
    then
      printf %s "'{[1,2), [3,4)}'::$TYPE_FORMAT_TYPE,ARRAY['{[1,2), [3,4)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'int4range' ]
    then
      printf %s "'[2,4)'::$TYPE_FORMAT_TYPE,ARRAY['[2,4)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'int8multirange' ]
    then
      printf %s "'{[4,12)}'::$TYPE_FORMAT_TYPE,ARRAY['{[4,12)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'int8range' ]
    then
      printf %s "'(3,7)'::$TYPE_FORMAT_TYPE,ARRAY['(3,7)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'nummultirange' ]
    then
      printf %s "'{[1.1,2.2)}'::$TYPE_FORMAT_TYPE,ARRAY['{[1.1,2.2)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'numrange' ]
    then
      printf %s "'(1.1,2.2)'::$TYPE_FORMAT_TYPE,ARRAY['(1.1,2.2)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'tsmultirange' ] || [ "$TYPE_NAME" = 'tstzmultirange' ]
    then
      printf %s "'{[2011-01-01,2011-03-01)}'::$TYPE_FORMAT_TYPE,ARRAY['{[2011-01-01,2011-03-01)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'tsrange' ] || [ "$TYPE_NAME" = 'tstzrange' ]
    then
      printf %s "'[2011-01-01,2011-03-01)'::$TYPE_FORMAT_TYPE,ARRAY['[2011-01-01,2011-03-01)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'dblink_pkey_results' ]
    then
      printf %s "'(1,2)'::$TYPE_FORMAT_TYPE,ARRAY['(1,2)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'line' ]
    then
      printf %s "'{1,2,3}'::$TYPE_FORMAT_TYPE,ARRAY['{1,2,3}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'tid' ] || [ "$TYPE_NAME" = 'point' ]
    then
      printf %s "'(1,2)'::$TYPE_FORMAT_TYPE,ARRAY['(1,2)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'circle' ]
    then
      printf %s "'<(1,2),3>'::$TYPE_FORMAT_TYPE,ARRAY['<(1,2),3>'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'lseg' ] || [ "$TYPE_NAME" = 'box' ] || [ "$TYPE_NAME" = 'path' ] || [ "$TYPE_NAME" = 'polygon' ]
    then
      printf %s "'((1,2),(3,4))'::$TYPE_FORMAT_TYPE,ARRAY['((1,2),(3,4))'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_CATEGORY" = 'I' ]
    then
      printf %s "'1.2.3.4'::$TYPE_FORMAT_TYPE,ARRAY['1.2.3.4'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_CATEGORY" = 'D' ]
    then
      printf %s "NOW()::text::$TYPE_FORMAT_TYPE,ARRAY[NOW()::text::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_CATEGORY" = 'Z' ]
    then
      printf %s "'f'::$TYPE_FORMAT_TYPE,ARRAY['f'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_TYPE" = 'r' ] || [ "$TYPE_TYPE" = 'm' ] || [ "$TYPE_CATEGORY" = 'A' ]
    then
      printf %s "'array[]'::$TYPE_FORMAT_TYPE,ARRAY['{}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_CATEGORY" = 'N' ] || [ "$TYPE_CATEGORY" = 'V' ] || [ "$TYPE_CATEGORY" = 'T' ] || [ "$TYPE_NAME" = 'cid' ] || [ "$TYPE_NAME" = 'xid' ] || [ "$TYPE_NAME" = 'xid8' ]
    then
      printf %s "'1'::$TYPE_FORMAT_TYPE,ARRAY['1'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'bool' ]
    then
      printf %s "'t'::$TYPE_FORMAT_TYPE,ARRAY['t'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'tsquery' ] || [ "$TYPE_NAME" = 'tsvector' ]
    then
      printf %s "'t'::$TYPE_FORMAT_TYPE,ARRAY['t'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    else
      printf %s "(SELECT string_agg(i::text, '') FROM generate_series(1,1000) i)::$TYPE_FORMAT_TYPE,ARRAY[(SELECT string_agg(i::text, '') FROM generate_series(1,1000) i)::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    fi
    )
    )\$insert\$;
INNER_EOF
done
)
  END\$plpgsql\$;\$execute\$;
END\$\$;

SELECT create_complex_table();
SELECT insert_complex(i) FROM generate_series(1, 3) AS i; 
EOF

  cat << EOF | tee "$LOG_PATH/sgstream-working.yaml" | kubectl replace --force -f -
apiVersion: stackgres.io/v1alpha1
kind: SGStream
metadata:
  namespace: $CLUSTER_NAMESPACE 
  name: "$STREAM_NAME"
spec:
  source:
$(
  if [ "$SKIP_DDL_IMPORT" = true ]
  then
    cat << INNER_EOF
    type: Postgres
    postgres:
      host: "$CLUSTER_NAME"
      port: 5433
      username:
        name: "$CLUSTER_NAME"
        key: superuser-username
      password:
        name: "$CLUSTER_NAME"
        key: superuser-password
      debeziumProperties:
INNER_EOF
  else
    cat << INNER_EOF
    type: SGCluster
    sgCluster:
      name: "$CLUSTER_NAME"
      debeziumProperties:
INNER_EOF
  fi
)
  target:
    type: SGCluster
    sgCluster:
      name: "$TARGET_CLUSTER_NAME"
      skipDdlImport: $SKIP_DDL_IMPORT
      debeziumProperties:
        batchSize: 1
  pods:
    persistentVolume:
      size: 1Gi
  debeziumEngineProperties:
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.snapshot.snapshotCompleted | grep -qxF true'
  then
    success "snapshot completed"
  else
    fail "snapshot did not completed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq '.status.snapshot.rowsScanned["public.test"]' | grep -qxF 3
  then
    success "test table scanned"
  else
    fail "test table not scanned"
  fi

  cat "$LOG_PATH/types" | while read -r TYPE_CATEGORY TYPE_TYPE TYPE_NAME TYPE_FORMAT_TYPE
  do
    if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq '.status.snapshot.rowsScanned["public.complex_'"$TYPE_NAME"'"]' | grep -qxF 3
    then
      success "complex_$TYPE_NAME table scanned"
    else
      fail "complex_$TYPE_NAME table not scanned"
    fi
  done

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.connected | grep -qxF true'
  then
    success "streaming started"
  else
    fail "streaming not started"
  fi

  cat << 'EOF' | tee "$LOG_PATH/insert-tables.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on
INSERT INTO test SELECT * FROM generate_series(4, 6);

SELECT insert_complex(i) FROM generate_series(4, 6) AS i; 
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.numberOfCommittedTransactions | grep -qxF "$(( 2 ))"'
  then
    success "streaming insert transaction successful"
  else
    fail "streaming insert transaction failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfCreateEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) * 3 ))" \
    && kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfUpdateEventsSeen | grep -qxF 0 \
    && kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfDeleteEventsSeen | grep -qxF 0
  then
    success "streaming insert events successful"
  else
    fail "streaming insert events failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) * 3 ))" \
    && wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.events.totalNumberOfEventsSent | grep -qxF "$(( (TYPE_COUNT + 1) * 6 ))"'
  then
    success "sent insert events successful"
  else
    fail "sent insert events failed"
  fi

  cat << EOF | tee "$LOG_PATH/delete-tables.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on
DELETE FROM test WHERE i = 1;

$(
set +x
cat "$LOG_PATH/types" | while read -r TYPE_CATEGORY TYPE_TYPE TYPE_NAME TYPE_FORMAT_TYPE
do
  cat << INNER_EOF
DELETE FROM "complex_$TYPE_NAME" WHERE i = 1;
INNER_EOF
done
)
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.numberOfCommittedTransactions | grep -qxF "$(( 3 + TYPE_COUNT ))"'
  then
    success "streaming delete transaction successful"
  else
    fail "streaming delete transaction failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfCreateEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) * 3 ))" \
    && kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfUpdateEventsSeen | grep -qxF 0 \
    && kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfDeleteEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) ))"
  then
    success "streaming delete events successful"
  else
    fail "streaming delete events failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) * 4 ))" \
    && wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.events.totalNumberOfEventsSent | grep -qxF "$(( (TYPE_COUNT + 1) * 8 ))"'
  then
    success "sent delete events successful"
  else
    fail "sent delete events failed"
  fi

  cat << EOF | tee "$LOG_PATH/update-tables.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on
UPDATE test SET t = 'hello' WHERE i = 6;

$(
set +x
cat "$LOG_PATH/types" | while read -r TYPE_CATEGORY TYPE_TYPE TYPE_NAME TYPE_FORMAT_TYPE
do
  cat << INNER_EOF
UPDATE "complex_$TYPE_NAME" SET
  ("c_${TYPE_NAME}" $(is_array_not_supported "$TYPE_CATEGORY" "$TYPE_TYPE" "$TYPE_NAME" "$TYPE_FORMAT_TYPE" || printf %s ", \"c_${TYPE_NAME}_array\"")) =
  (
    SELECT c1 $(is_array_not_supported "$TYPE_CATEGORY" "$TYPE_TYPE" "$TYPE_NAME" "$TYPE_FORMAT_TYPE" || printf ', c2')
    FROM (
    SELECT NULL AS c1, NULL AS c2 where 1 = 0 
    -- Returns empty table with predefined column names
    UNION ALL
    SELECT
    $(
    if [ "$TYPE_NAME" = 'aclitem' ]
    then
      printf %s "makeaclitem('postgres'::regrole, 'authenticator'::regrole, 'UPDATE', FALSE)::text::$TYPE_FORMAT_TYPE",null
    elif [ "$TYPE_NAME" = 'json' ] || [ "$TYPE_NAME" = 'jsonb' ]
    then
      printf %s "('\"' || (SELECT string_agg(i::text, '') FROM generate_series(2,1001) i) || '\"')::$TYPE_FORMAT_TYPE,"
      printf %s "ARRAY[('\"' || (SELECT string_agg(i::text, '') FROM generate_series(2,1001) i) || '\"')::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'jsonpath' ]
    then
      printf %s "('\$.b' || (SELECT string_agg(i::text, '') FROM generate_series(2,1001) i))::$TYPE_FORMAT_TYPE,"
      printf %s "ARRAY[('\$.b' || (SELECT string_agg(i::text, '') FROM generate_series(2,1001) i))::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'macaddr' ]
    then
      printf %s "'09:00:2b:01:02:03'::$TYPE_FORMAT_TYPE, ARRAY['09:00:2b:01:02:03'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'macaddr8' ]
    then
      printf %s "'09:00:2b:01:02:03:04:05'::$TYPE_FORMAT_TYPE,ARRAY['09:00:2b:01:02:03:04:05'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'pg_lsn' ]
    then
      printf %s "'0/0'::$TYPE_FORMAT_TYPE,ARRAY['0/0'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'pg_snapshot' ]
    then
      printf %s "txid_current_snapshot()::text::$TYPE_FORMAT_TYPE,ARRAY[txid_current_snapshot()::text::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'txid_snapshot' ]
    then
      printf %s "'20:30:20,24,25'::$TYPE_FORMAT_TYPE,ARRAY['20:30:20,24,25'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'uuid' ]
    then
      printf %s "'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::$TYPE_FORMAT_TYPE,ARRAY['b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'datemultirange' ]
    then
      printf %s "'{(,)}'::$TYPE_FORMAT_TYPE,ARRAY['{(,)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'daterange' ]
    then
      printf %s "'(,)'::$TYPE_FORMAT_TYPE,ARRAY['(,)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'int4multirange' ]
    then
      printf %s "'{[2,3), [4,5)}'::$TYPE_FORMAT_TYPE,ARRAY['{[2,3), [4,5)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'int4range' ]
    then
      printf %s "'[3,5)'::$TYPE_FORMAT_TYPE,ARRAY['[3,5)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'int8multirange' ]
    then
      printf %s "'{[5,13)}'::$TYPE_FORMAT_TYPE,ARRAY['{[5,13)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'int8range' ]
    then
      printf %s "'(4,8)'::$TYPE_FORMAT_TYPE,ARRAY['(4,8)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'nummultirange' ]
    then
      printf %s "'{[2.2,3.3)}'::$TYPE_FORMAT_TYPE,ARRAY['{[2.2,3.3)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'numrange' ]
    then
      printf %s "'(2.2,3.3)'::$TYPE_FORMAT_TYPE,ARRAY['(2.2,3.3)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'tsmultirange' ] || [ "$TYPE_NAME" = 'tstzmultirange' ]
    then
      printf %s "'{[2011-01-02,2011-03-02)}'::$TYPE_FORMAT_TYPE,ARRAY['{[2011-01-02,2011-03-02)}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'tsrange' ] || [ "$TYPE_NAME" = 'tstzrange' ]
    then
      printf %s "'[2011-01-02,2011-03-02)'::$TYPE_FORMAT_TYPE,ARRAY['[2011-01-02,2011-03-02)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'dblink_pkey_results' ]
    then
      printf %s "'(2,3)'::$TYPE_FORMAT_TYPE,ARRAY['(2,3)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'line' ]
    then
      printf %s "'{2,3,4}'::$TYPE_FORMAT_TYPE,ARRAY['{2,3,4}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'tid' ] || [ "$TYPE_NAME" = 'point' ]
    then
      printf %s "'(2,3)'::$TYPE_FORMAT_TYPE,ARRAY['(2,3)'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'circle' ]
    then
      printf %s "'<(2,3),4>'::$TYPE_FORMAT_TYPE,ARRAY['<(2,3),4>'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'lseg' ] || [ "$TYPE_NAME" = 'box' ] || [ "$TYPE_NAME" = 'path' ] || [ "$TYPE_NAME" = 'polygon' ]
    then
      printf %s "'((2,3),(4,5))'::$TYPE_FORMAT_TYPE,ARRAY['((2,3),(4,5))'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_CATEGORY" = 'I' ]
    then
      printf %s "'2.3.4.5'::$TYPE_FORMAT_TYPE,ARRAY['2.3.4.5'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_CATEGORY" = 'D' ]
    then
      printf %s "NOW()::text::$TYPE_FORMAT_TYPE,ARRAY[NOW()::text::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_CATEGORY" = 'Z' ]
    then
      printf %s "'f'::$TYPE_FORMAT_TYPE,ARRAY['f'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_TYPE" = 'r' ] || [ "$TYPE_TYPE" = 'm' ] || [ "$TYPE_CATEGORY" = 'A' ]
    then
      printf %s "'array[]'::$TYPE_FORMAT_TYPE,ARRAY['{}'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_CATEGORY" = 'N' ] || [ "$TYPE_CATEGORY" = 'V' ] || [ "$TYPE_CATEGORY" = 'T' ] || [ "$TYPE_NAME" = 'cid' ] || [ "$TYPE_NAME" = 'xid' ] || [ "$TYPE_NAME" = 'xid8' ]
    then
      printf %s "'0'::$TYPE_FORMAT_TYPE,ARRAY['0'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'bool' ]
    then
      printf %s "'f'::$TYPE_FORMAT_TYPE,ARRAY['f'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    elif [ "$TYPE_NAME" = 'tsquery' ] || [ "$TYPE_NAME" = 'tsvector' ]
    then
      printf %s "'f'::$TYPE_FORMAT_TYPE,ARRAY['f'::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    else
      printf %s "(SELECT string_agg(i::text, '') FROM generate_series(2,1002) i)::$TYPE_FORMAT_TYPE,ARRAY[(SELECT string_agg(i::text, '') FROM generate_series(2,1002) i)::$TYPE_FORMAT_TYPE]::$TYPE_FORMAT_TYPE[]"
    fi
    )
    ) )
  WHERE i = 6;
INNER_EOF
done
)

$(
set +x
cat "$LOG_PATH/types" | while read -r TYPE_CATEGORY TYPE_TYPE TYPE_NAME TYPE_FORMAT_TYPE
do
  cat << INNER_EOF
UPDATE "complex_$TYPE_NAME" SET updated = true WHERE i = 6;
INNER_EOF
done
)
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.numberOfCommittedTransactions | grep -qxF "$(( 4 + TYPE_COUNT * 3 ))"'
  then
    success "streaming update transaction successful"
  else
    fail "streaming update transaction failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfCreateEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) * 3 ))" \
    && kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfUpdateEventsSeen | grep -qxF "$(( TYPE_COUNT + (TYPE_COUNT + 1) ))" \
    && kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfDeleteEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) ))"
  then
    success "streaming update events successful"
  else
    fail "streaming update events failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfEventsSeen | grep -qxF "$(( TYPE_COUNT + (TYPE_COUNT + 1) * 5 ))" \
    && wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.events.totalNumberOfEventsSent | grep -qxF "$(( TYPE_COUNT + (TYPE_COUNT + 1) * 9 ))"'
  then
    success "sent update events successful"
  else
    fail "sent update events failed"
  fi

  cat << 'EOF' | tee "$LOG_PATH/alter-tables.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on
ALTER TABLE test ADD COLUMN n int DEFAULT 0;

ALTER TABLE test DROP COLUMN t;

INSERT INTO test SELECT i, i FROM generate_series(7, 9) AS i;

CREATE TABLE pop(i bigint, t text, PRIMARY KEY(i));

INSERT INTO pop SELECT i, 'test' FROM generate_series(1, 3) AS i;
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.numberOfCommittedTransactions | grep -qxF "$(( 6 + TYPE_COUNT * 3 ))"'
  then
    success "streaming alter transaction successful"
  else
    fail "streaming alter transaction failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfCreateEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) * 3 + 6 ))" \
    && kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfUpdateEventsSeen | grep -qxF "$(( TYPE_COUNT + (TYPE_COUNT + 1) ))" \
    && kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfDeleteEventsSeen | grep -qxF "$(( (TYPE_COUNT + 1) ))"
  then
    success "streaming alter events successful"
  else
    fail "streaming alter events failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.totalNumberOfEventsSeen | grep -qxF "$(( TYPE_COUNT + (TYPE_COUNT + 1) * 5 + 6 ))" \
    && wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.events.totalNumberOfEventsSent | grep -qxF "$(( TYPE_COUNT + (TYPE_COUNT + 1) * 9 + 6 ))"'
  then
    success "sent alter events successful"
  else
    fail "sent alter events failed"
  fi

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.events.lastEventWasSent | grep -qxF true'
  then
    success "sent last event successful"
  else
    fail "sent last event failed"
  fi

  kubectl annotate sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" debezium-signal.stackgres.io/tombstone=

  if kubectl wait --timeout="${E2E_TIMEOUT}s" sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" --for=condition=Completed
  then
    success "stream has completed"
  else
    fail "stream has not completed"
  fi

  if [ "$SKIP_DDL_IMPORT" != true ]
  then
    local QUERY
    QUERY="$(cat << 'EOF'
DROP TABLE IF EXISTS input; CREATE TEMPORARY TABLE input (line text);
COPY input FROM PROGRAM $cmd$sh -c '{ { { pg_dumpall --clean --if-exists --roles-only; echo $? >&3; } | base64 -w 0 >&4; } 3>&1 | { read EXIT_CODE; exit "$EXIT_CODE"; }; } 4>&1'$cmd$ DELIMITER E'\1';
COPY input FROM PROGRAM $cmd$sh -c '{ { { pg_dump --clean --if-exists --schema-only --dbname=postgres --exclude-table="(test|spatial_ref_sys|pg_stat_statements_info|pg_stat_statements|geometry_columns|geography_columns)" --exclude-schema="__migration__" --no-publications --no-subscriptions; echo $? >&3; } | base64 -w 0 >&4; } 3>&1 | { read EXIT_CODE; exit "$EXIT_CODE"; }; } 4>&1'$cmd$ DELIMITER E'\1';
COPY input FROM PROGRAM $cmd$sh -c '{ { { pg_dump --data-only --dbname=postgres --exclude-table="(test|spatial_ref_sys)" --exclude-schema="__migration__"; echo $? >&3; } | base64 -w 0 >&4; } 3>&1 | { read EXIT_CODE; exit "$EXIT_CODE"; }; } 4>&1'$cmd$ DELIMITER E'\1';
SELECT line FROM (SELECT regexp_split_to_table(convert_from(decode(line, 'base64'), 'UTF8'), E'\n') AS line FROM input) _
  WHERE line NOT LIKE '-- %' AND line NOT LIKE '--' AND line != '' -- Skip comments and empty lines
  AND line NOT SIMILAR TO '(CREATE|ALTER|DROP) ROLE(| IF EXISTS) (postgres|replicator|authenticator)%' -- Skip SGCluster existing roles
  AND line NOT SIMILAR TO '(DROP|CREATE) EXTENSION(| IF EXISTS| IF NOT EXISTS) (dblink|postgis)(;| %)'
  AND line NOT SIMILAR TO 'COMMENT ON EXTENSION (dblink|postgis) %'
  AND line NOT SIMILAR TO '% SET "sgstream.ddl_import_completed" TO ''true'';'
  ;
EOF
  )"
    run_query -p 5432 -i "0" -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$QUERY" | grep -v '^COPY ' > "$LOG_PATH/expected-schema"
    run_query -p 5432 -i "0" -h "$TARGET_CLUSTER_NAME" -c "$TARGET_CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$QUERY" | grep -v '^COPY ' > "$LOG_PATH/actual-schema"
    if diff "$LOG_PATH/expected-schema" "$LOG_PATH/actual-schema"
    then
      success "schema was migrated successfully"
    else
      fail "schema was not migrated successfully"
    fi
  fi
}

is_array_not_supported() {
  local TYPE_CATEGORY="$1"
  local TYPE_TYPE="$2"
  local TYPE_NAME="$3"
  local TYPE_FORMAT_TYPE="$4"
  [ "${TYPE_NAME%serial}" != "$TYPE_NAME" ] \
    || [ "$TYPE_NAME" = 'aclitem' ] \
    || [ "$TYPE_NAME" = 'point' ] \
    || [ "$TYPE_NAME" = 'money' ] \
    || [ "$TYPE_NAME" = 'interval' ] \
    || [ "$TYPE_NAME" = 'name' ] \
    || [ "$TYPE_NAME" = 'varbit' ] \
    || [ "$TYPE_NAME" = 'xml' ] \
    || [ "$TYPE_NAME" = 'jsonpath' ] \
    || [ "$TYPE_NAME" = 'bytea' ] \
    || [ "$TYPE_NAME" = 'time' ] \
    || [ "$TYPE_NAME" = 'timetz' ] \
    || [ "$TYPE_NAME" = 'bit' ]
}

check_stream_copy_schema_is_working() {
  wait_until check_sakila_database "$CLUSTER_NAME"

  cat << EOF | tee "$LOG_PATH/sgstream-copy-schema-working.yaml" | kubectl replace --force -f -
apiVersion: stackgres.io/v1alpha1
kind: SGStream
metadata:
  namespace: $CLUSTER_NAMESPACE 
  name: "$STREAM_NAME"
spec:
  maxRetries: 0
  source:
    type: SGCluster
    sgCluster:
      name: "$CLUSTER_NAME"
      database: sakila
      debeziumProperties:
        snapshotSelectStatementOverrides:
          public.payment: "SELECT * FROM ONLY payment"
  target:
    type: SGCluster
    sgCluster:
      name: "$TARGET_CLUSTER_NAME"
      database: sakila
      debeziumProperties:
  pods:
    persistentVolume:
      size: 1Gi
  debeziumEngineProperties:
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.snapshot.snapshotCompleted | grep -qxF true'
  then
    success "snapshot completed"
  else
    fail "snapshot did not completed"
  fi

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.connected | grep -qxF true'
  then
    success "streaming started"
  else
    fail "streaming not started"
  fi

  kubectl annotate sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" debezium-signal.stackgres.io/tombstone=

  if kubectl wait --timeout="${E2E_TIMEOUT}s" sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" --for=condition=Completed
  then
    success "stream has completed"
  else
    fail "stream has not completed"
  fi

  local SCHEMA_QUERY
  SCHEMA_QUERY="$(cat << 'EOF'
DROP TABLE IF EXISTS input; CREATE TEMPORARY TABLE input (line text);
COPY input FROM PROGRAM $cmd$sh -c '{ { { pg_dumpall --clean --if-exists --roles-only; echo $? >&3; } | base64 -w 0 >&4; } 3>&1 | { read EXIT_CODE; exit "$EXIT_CODE"; }; } 4>&1'$cmd$ DELIMITER E'\1';
COPY input FROM PROGRAM $cmd$sh -c '{ { { pg_dump --clean --if-exists --schema-only --dbname=sakila --exclude-table="(test|spatial_ref_sys)" --exclude-schema="__migration__" --no-publications --no-subscriptions; echo $? >&3; } | base64 -w 0 >&4; } 3>&1 | { read EXIT_CODE; exit "$EXIT_CODE"; }; } 4>&1'$cmd$ DELIMITER E'\1';
SELECT line FROM (SELECT regexp_split_to_table(convert_from(decode(line, 'base64'), 'UTF8'), E'\n') AS line FROM input) _
  WHERE line NOT LIKE '-- %' AND line NOT LIKE '--' AND line != '' -- Skip comments and empty lines
  AND line NOT SIMILAR TO '(CREATE|ALTER|DROP) ROLE(| IF EXISTS) (postgres|replicator|authenticator)%' -- Skip SGCluster existing roles
  AND line NOT SIMILAR TO '(DROP|CREATE) EXTENSION(| IF EXISTS| IF NOT EXISTS) (dblink|postgis)(;| %)'
  AND line NOT SIMILAR TO 'COMMENT ON EXTENSION (dblink|postgis) %'
  AND line NOT SIMILAR TO '% SET "sgstream.ddl_import_completed" TO ''true'';'
  ;
EOF
)"
  run_query -p 5432 -i "0" -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$SCHEMA_QUERY" > "$LOG_PATH/copy-schema-expected-schema"
  run_query -p 5432 -i "0" -h "$TARGET_CLUSTER_NAME" -c "$TARGET_CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$SCHEMA_QUERY" > "$LOG_PATH/copy-schema-actual-schema"
  if diff "$LOG_PATH/copy-schema-expected-schema" "$LOG_PATH/copy-schema-actual-schema"
  then
    success "sakila schema was migrated successfully"
  else
    fail "sakila schema was not migrated successfully"
  fi

  local DATA_QUERY
  DATA_QUERY="$(cat << 'EOF'
DO
$$
DECLARE
    rec RECORD;
    count text;
    hash text;
BEGIN
    -- Loop through all sequences in the current schema
    FOR rec IN
        SELECT
            table_schema,
            table_name
        FROM
            information_schema.tables
        WHERE table_schema IN ('public') AND table_name NOT IN ('test', 'spatial_ref_sys')
        ORDER BY table_schema, table_name
    LOOP
        -- Reset the sequence based on the maximum id value in the table
        EXECUTE 'SELECT count(*)::text AS count, '
          || 'md5(bit_xor(((''x'' || left(md5(' || quote_ident(rec.table_name) || '::text), 16))::bit(64))'
          || ' # ((''x'' || right(md5(' || quote_ident(rec.table_name) || '::text), 16))::bit(64)))::text) AS hash'
          || ' FROM ' || quote_ident(rec.table_schema) || '.' || quote_ident(rec.table_name) INTO count, hash;
        RAISE NOTICE '%: count:%, hash:%', quote_ident(rec.table_schema) || '.' || quote_ident(rec.table_name), count, hash;
    END LOOP;
END
$$;
EOF
)"
  run_query -p 5432 -i "0" -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$DATA_QUERY" | sort > "$LOG_PATH/copy-schema-expected-data"
  run_query -p 5432 -i "0" -h "$TARGET_CLUSTER_NAME" -c "$TARGET_CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$DATA_QUERY" | sort > "$LOG_PATH/copy-schema-actual-data"
  if diff "$LOG_PATH/copy-schema-expected-data" "$LOG_PATH/copy-schema-actual-data"
  then
    success "sakila data was migrated successfully"
  else
    fail "sakila data was not migrated successfully"
  fi
}

check_sakila_database() {
  local CLUSTER_NAME="$1"
  check_user "$1" 0
  check_database "$1" 0
  check_schema "$1" 0
}

check_stream_incremental_snapshots_is_working() {
  kubectl exec -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c patroni -- \
    createdb pgbench
  kubectl exec -n "$CLUSTER_NAMESPACE" "$TARGET_CLUSTER_NAME-0" -c patroni -- \
    createdb pgbench
cat << 'EOF' > "$LOG_PATH/pgbench.sql"
DROP TABLE IF EXISTS public.pgbench_branches;
DROP TABLE IF EXISTS public.pgbench_tellers;
DROP TABLE IF EXISTS public.pgbench_accounts;
DROP TABLE IF EXISTS public.pgbench_history;

CREATE TABLE public.pgbench_branches (
    bid integer NOT NULL,
    bbalance integer,
    filler character(88)
)
WITH (fillfactor='100');
CREATE TABLE public.pgbench_tellers (
    bid integer,
    tid integer NOT NULL,
    tbalance integer,
    filler character(84)
)
WITH (fillfactor='100');
CREATE TABLE public.pgbench_accounts (
    bid integer,
    aid integer NOT NULL,
    abalance integer,
    filler character(84)
)
WITH (fillfactor='100');
CREATE TABLE public.pgbench_history (
    bid integer,
    tid integer,
    aid integer,
    delta integer,
    mtime timestamp without time zone,
    filler character(22)
);

ALTER TABLE ONLY public.pgbench_branches
    ADD CONSTRAINT pgbench_branches_pkey PRIMARY KEY (bid);
ALTER TABLE ONLY public.pgbench_tellers
    ADD CONSTRAINT pgbench_tellers_pkey PRIMARY KEY (tid);
ALTER TABLE ONLY public.pgbench_accounts
    ADD CONSTRAINT pgbench_accounts_pkey PRIMARY KEY (aid);
ALTER TABLE ONLY public.pgbench_history
    ADD CONSTRAINT pgbench_history_pkey PRIMARY KEY (bid, tid, aid);

INSERT INTO pgbench_branches (bid, bbalance)
  SELECT i, 0 FROM generate_series(1, 7) AS i;
INSERT INTO pgbench_tellers (tid, bid, tbalance)
  SELECT i, 1 + ((i - 1) / 10), 0 FROM generate_series(1, 7 * 10) AS i;
INSERT INTO pgbench_accounts (aid, bid, abalance)
  SELECT i, 1 + ((i - 1) / 100000), 0 FROM generate_series(1, 7 * 100000) AS i;
EOF
  cat "$LOG_PATH/pgbench.sql" \
    | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c patroni -- \
      psql -d pgbench

  kubectl exec -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c patroni -- \
    pgbench -d pgbench -s 100 -c 4 -j 4 -T "$(( E2E_TIMEOUT * 10 ))" \
    > "$LOG_PATH/pgbench.log" 2>&1 &
  echo "$!" > "$LOG_PATH/pgbench.pid"
  trap_kill "$(cat "$LOG_PATH/pgbench.pid")"

  cat << 'EOF' | tee "$LOG_PATH/create-debezium-signal-table.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on -d pgbench
  CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
EOF

  cat << EOF | tee "$LOG_PATH/sgstream-incremental-snapshot-working.yaml" | kubectl replace --force -f -
apiVersion: stackgres.io/v1alpha1
kind: SGStream
metadata:
  namespace: $CLUSTER_NAMESPACE 
  name: "$STREAM_NAME"
spec:
  maxRetries: 0
  source:
    type: SGCluster
    sgCluster:
      name: "$CLUSTER_NAME"
      database: pgbench
      debeziumProperties:
        snapshotMode: no_data
        signalEnabledChannels: [source]
        signalDataCollection: public.debezium_signal
  target:
    type: SGCluster
    sgCluster:
      name: "$TARGET_CLUSTER_NAME"
      database: pgbench
      skipDropPrimaryKeys: true
      skipRestoreIndexesAfterSnapshot: true
      debeziumProperties:
        detectInsertMode: false
        removePlaceholders: false
  pods:
    persistentVolume:
      size: 1Gi
  debeziumEngineProperties:
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.connected | grep -qxF true'
  then
    success "streaming started"
  else
    fail "streaming not started"
  fi

  cat << 'EOF' | tee "$LOG_PATH/signal-start-incremental-snapshots.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on -d pgbench
INSERT INTO debezium_signal VALUES (now()::text, 'execute-snapshot', '{"data-collections": [' || (SELECT string_agg('"' || nspname || '.' || relname || '"', ',') FROM pg_class LEFT JOIN pg_namespace ON (pg_class.relnamespace = pg_namespace.oid) WHERE nspname = 'public' AND relkind = 'r' AND relname NOT IN ('debezium_signal')) || '], "type":"incremental"}');
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.snapshot.snapshotCompleted | grep -qxF true'
  then
    success "snapshot completed"
  else
    fail "snapshot did not completed"
  fi

  kill "$LOG_PATH/pgbench.pid" || true

  cat << 'EOF' | tee "$LOG_PATH/signal-tombstone.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on -d pgbench
INSERT INTO debezium_signal VALUES (now()::text, 'tombstone', '{}');
EOF

  if kubectl wait --timeout="${E2E_TIMEOUT}s" sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" --for=condition=Completed
  then
    success "stream has completed"
  else
    fail "stream has not completed"
  fi

  local SCHEMA_QUERY
  SCHEMA_QUERY="$(cat << 'EOF'
DROP TABLE IF EXISTS input; CREATE TEMPORARY TABLE input (line text);
COPY input FROM PROGRAM $cmd$sh -c '{ { { pg_dumpall --clean --if-exists --roles-only; echo $? >&3; } | base64 -w 0 >&4; } 3>&1 | { read EXIT_CODE; exit "$EXIT_CODE"; }; } 4>&1'$cmd$ DELIMITER E'\1';
COPY input FROM PROGRAM $cmd$sh -c '{ { { pg_dump --clean --if-exists --schema-only --dbname=pgbench --exclude-table="(test|spatial_ref_sys|debezium_signal)" --exclude-schema="__migration__" --no-publications --no-subscriptions; echo $? >&3; } | base64 -w 0 >&4; } 3>&1 | { read EXIT_CODE; exit "$EXIT_CODE"; }; } 4>&1'$cmd$ DELIMITER E'\1';
SELECT line FROM (SELECT regexp_split_to_table(convert_from(decode(line, 'base64'), 'UTF8'), E'\n') AS line FROM input) _
  WHERE line NOT LIKE '-- %' AND line NOT LIKE '--' AND line != '' -- Skip comments and empty lines
  AND line NOT SIMILAR TO '(CREATE|ALTER|DROP) ROLE(| IF EXISTS) (postgres|replicator|authenticator)%' -- Skip SGCluster existing roles
  AND line NOT SIMILAR TO '(DROP|CREATE) EXTENSION(| IF EXISTS| IF NOT EXISTS) (dblink|postgis)(;| %)'
  AND line NOT SIMILAR TO 'COMMENT ON EXTENSION (dblink|postgis) %'
  AND line NOT SIMILAR TO '% SET "sgstream.ddl_import_completed" TO ''true'';'
  ;
EOF
)"
  run_query -p 5432 -i "0" -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$SCHEMA_QUERY" > "$LOG_PATH/incremental-snapshot-expected-schema"
  run_query -p 5432 -i "0" -h "$TARGET_CLUSTER_NAME" -c "$TARGET_CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$SCHEMA_QUERY" > "$LOG_PATH/incremental-snapshot-actual-schema"
  if diff "$LOG_PATH/incremental-snapshot-expected-schema" "$LOG_PATH/incremental-snapshot-actual-schema"
  then
    success "pgbench schema was migrated successfully"
  else
    fail "pgbench schema was not migrated successfully"
  fi

  local DATA_QUERY
  DATA_QUERY="$(cat << 'EOF'
DO
$$
DECLARE
    rec RECORD;
    count text;
    hash text;
BEGIN
    -- Loop through all sequences in the current schema
    FOR rec IN
        SELECT
            table_schema,
            table_name
        FROM
            information_schema.tables
        WHERE table_schema IN ('public') AND table_name NOT IN ('debezium_signal')
        ORDER BY table_schema, table_name
    LOOP
        -- Reset the sequence based on the maximum id value in the table
        EXECUTE 'SELECT count(*)::text AS count, '
          || 'md5(bit_xor(((''x'' || left(md5(' || quote_ident(rec.table_name) || '::text), 16))::bit(64))'
          || ' # ((''x'' || right(md5(' || quote_ident(rec.table_name) || '::text), 16))::bit(64)))::text) AS hash'
          || ' FROM ' || quote_ident(rec.table_schema) || '.' || quote_ident(rec.table_name) INTO count, hash;
        RAISE NOTICE '%: count:%, hash:%', quote_ident(rec.table_schema) || '.' || quote_ident(rec.table_name), count, hash;
    END LOOP;
END
$$;
EOF
)"
  run_query -p 5432 -i "0" -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$DATA_QUERY" | sort > "$LOG_PATH/incremental-snapshot-expected-data"
  run_query -p 5432 -i "0" -h "$TARGET_CLUSTER_NAME" -c "$TARGET_CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "$DATA_QUERY" | sort > "$LOG_PATH/incremental-snapshot-actual-data"
  if diff "$LOG_PATH/incremental-snapshot-expected-data" "$LOG_PATH/incremental-snapshot-actual-data" > "$LOG_PATH/incremental-snapshot-data-diff"
  then
    success "pgbench data was migrated successfully"
  else
    fail "pgbench data was not migrated successfully"
  fi
}
